6a7a7df7ea9f341ab4c75adc31d599e8ba909347,com.ibm.streamsx.hdfs/impl/java/src/com/ibm/streamsx/hdfs/HDFS2FileSink.java,HDFS2FileSink,checkParameters,#OperatorContextChecker#,349
Before Change
if (!bytesPerFileVal.isEmpty()) {
if (Long.valueOf(bytesPerFileVal.get(0)) < 0) {
checker.setInvalidContext(
"Operator parameter bytesPerFile value should not be less than 0.",
null);
}
}
if (!tuplesPerFileVal.isEmpty()) {
if (Long.valueOf(tuplesPerFileVal.get(0)) < 0) {
checker.setInvalidContext(
"Operator parameter tuplesPerFile value should not be less than 0.",
null);
}
}
if (!timeForFileVal.isEmpty()) {
if (Float.valueOf(timeForFileVal.get(0)) < 0.0) {
checker.setInvalidContext(
"Operator parameter timePerFile value should not be less than 0.",
null);
}
}
int dataAttribute = 0;
int fileAttribute = -1;
StreamSchema inputSchema = checker.getOperatorContext()
.getStreamingInputs().get(0).getStreamSchema();
if (checker.getOperatorContext().getParameterNames()
.contains(IHdfsConstants.PARAM_FILE_NAME_ATTR)) {
String fileNameAttr = checker.getOperatorContext()
.getParameterValues(IHdfsConstants.PARAM_FILE_NAME_ATTR)
.get(0);
fileAttribute = inputSchema.getAttribute(fileNameAttr).getIndex();
if (fileAttribute == 0) {
// default data attribute of 0 is not right, so need to fix
// that.
dataAttribute = 1;
}
}
// now, check the data attribute is an okay type.
MetaType dataType = inputSchema.getAttribute(dataAttribute).getType()
.getMetaType();
// check that the data type is okay.
if (dataType != MetaType.RSTRING && dataType != MetaType.USTRING
&& dataType != MetaType.BLOB) {
checker.setInvalidContext(
"The data attribute must have type ustring, rstring, or blob. Found attribute of type "
+ dataType, null);
}
if (fileAttribute != -1) {
// If we have a filename attribute, let's check that it's the right
// type.
if (MetaType.RSTRING != inputSchema.getAttribute(1).getType()
.getMetaType()
&& MetaType.USTRING != inputSchema.getAttribute(1)
.getType().getMetaType()) {
checker.setInvalidContext(
"Expected attribute of type rstring or ustring on input port, found attribute of type "
+ inputSchema.getAttribute(1).getType()
.getMetaType(), null);
}
}
}
After Change
if (!bytesPerFileVal.isEmpty()) {
if (Long.valueOf(bytesPerFileVal.get(0)) < 0) {
checker.setInvalidContext(
Messages.getString("HDFS_SINK_INVALID_VALUE_BYTEPERFILE"),
null);
}
}
if (!tuplesPerFileVal.isEmpty()) {
if (Long.valueOf(tuplesPerFileVal.get(0)) < 0) {
checker.setInvalidContext(
Messages.getString("HDFS_SINK_INVALID_VALUE_TUPLESPERFILE"),
null);
}
}
if (!timeForFileVal.isEmpty()) {
if (Float.valueOf(timeForFileVal.get(0)) < 0.0) {
checker.setInvalidContext(
Messages.getString("HDFS_SINK_INVALID_VALUE_TIMEPERFIL"),
null);
}
}
int dataAttribute = 0;
int fileAttribute = -1;
StreamSchema inputSchema = checker.getOperatorContext()
.getStreamingInputs().get(0).getStreamSchema();
if (checker.getOperatorContext().getParameterNames()
.contains(IHdfsConstants.PARAM_FILE_NAME_ATTR)) {
String fileNameAttr = checker.getOperatorContext()
.getParameterValues(IHdfsConstants.PARAM_FILE_NAME_ATTR)
.get(0);
fileAttribute = inputSchema.getAttribute(fileNameAttr).getIndex();
if (fileAttribute == 0) {
// default data attribute of 0 is not right, so need to fix
// that.
dataAttribute = 1;
}
}
// now, check the data attribute is an okay type.
MetaType dataType = inputSchema.getAttribute(dataAttribute).getType()
.getMetaType();
// check that the data type is okay.
if (dataType != MetaType.RSTRING && dataType != MetaType.USTRING
&& dataType != MetaType.BLOB) {
checker.setInvalidContext(
Messages.getString("HDFS_SINK_INVALID_DATA_ATTR_TYPE", dataType),
null);
}
if (fileAttribute != -1) {
// If we have a filename attribute, let's check that it's the right
// type.
if (MetaType.RSTRING != inputSchema.getAttribute(1).getType()
.getMetaType()
&& MetaType.USTRING != inputSchema.getAttribute(1)
.getType().getMetaType()) {
checker.setInvalidContext(
Messages.getString("HDFS_SINK_INVALID_ATTR_FILENAME", inputSchema.getAttribute(1).getType().getMetaType()),
null);
}
}